Skip to content

feat: tedge connect to c8y mqtt service endpoint #3707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

albinsuresh
Copy link
Contributor

@albinsuresh albinsuresh commented Jun 25, 2025

Proposed changes

  • Specification (with multiple approaches) for connecting thin-edge to the new Cumulocity MQTT service endpoint.
  • Impl finalised approach (connect to mqtt service along with core mqtt)
  • mosquitto bridge support
  • builtin bridge support

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue


Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

Copy link

codecov bot commented Jun 25, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@albinsuresh albinsuresh force-pushed the feat/c8y-mqtt-service-connect branch from a332515 to 6e25aff Compare June 26, 2025 06:34
@albinsuresh albinsuresh marked this pull request as ready for review June 26, 2025 06:34
@reubenmiller
Copy link
Contributor

reubenmiller commented Jun 26, 2025

Given that there is a clear vision that the Cumulocity mqtt-service will replace the MQTT core functionality, then approach 1 is probably the best approach for delivering the immediate "mqtt-service support" feature. As it would also add a smooth transition to only using the mqtt-service without changing much on the device side, e.g. tedge connect c8y would still be the same UX but it would just do something differently behind the scenes.

Though I still think we should explore the idea of support the connection to any MQTT broker by allowing option 2, but we could do that specific stuff in a different feature implementation.

Where a generic feature could look very similar to what is being proposed, by you can also specify a "type"

[bridge.custom]     # where "custom" is the unique id of the mapper
type = "generic | c8y (mqtt-core) | c8y-mqtt-service | aws | az"        # type control default mapping rules and any type specific connection logic
device.key_path = ""
device.cert_path = ""
bridge.username = "${cert.Issuer.CN}"   # reference meta info from the device's certificate
bridge.topic_prefix = "${bridge.id}"    # reference its own id, custom

# control which subscriptions the bridge will subscribe to
remote_subscriptions = [
    "3"
]

# type specific settings (e.g. for the c8y proxy)
c8y.proxy.bind.port = 8001

# future properties
pipelines_dir = "/etc/tedge/pipelines/${bridge.id}"      # specify which mapping/filtering rules should be used (once we have a generic mapper)

@albinsuresh albinsuresh force-pushed the feat/c8y-mqtt-service-connect branch from 6e25aff to 9979965 Compare July 3, 2025 08:31
@albinsuresh albinsuresh changed the title spec: c8y mqtt service connection spec feat: tedge connect to c8y mqtt service endpoint Jul 3, 2025
@albinsuresh albinsuresh requested a review from a team as a code owner July 4, 2025 13:36
Execute Command tedge config set c8y.tenant_id t37070943
Execute Command tedge config set c8y.mqtt_service.enabled true
Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic'
Execute Command tedge connect c8y
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I check that the connection to the MQTT service is actually established?

  • Neither tedge connect c8y nor tedge connect c8y --test tells a word about the MQTT service and status.
  • I tried on purpose to break the config, no errors are returned by tedge connect.

Good. The bridge status published on te/device/main/service/mosquitto-c8y-mqtt-bridge/status/health is working and reports broken as well as correct settings.

=> I would add a test assertion that a status message of 1 is received on this topic.

Execute Command tedge config set mqtt.bridge.built_in true
Execute Command tedge config set c8y.tenant_id t37070943
Execute Command tedge config set c8y.mqtt_service.enabled true
Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's something really weird with multiple subscriptions on MQTT service. MQTT service is rejecting subscription requests with multiple topics from the built-in bridge, which is why this test case was failing. Here is the same reproduced with mosquitto_sub client:

$ mosquitto_sub -v -h albin.latest.stage.c8y.io -p 9883 -i albin-123 --key device.key --cert device.crt --cafile c8y-root.crt -u t37070943 -t sub/topic -t demo/topic
All subscription requests were denied.

Once you limit the subscription to a single topic, it works. This test also works when there is only one subscription topic.

But to my surprise, the mosquitto bridge with multiple subscriptions works just fine (as done in the first test case).

}

// Templates
tc.forward_from_local("#", local_prefix.clone(), "")?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this bridge rule triggers infinite loops. This forward_from_local rule bridges anything received on c8y-mqtt/# to the cloud and the above forward_from_remote rule bridges anything received on the subscribed topics to local topics with the c8y/ prefix. And this creates an infinite loop when a message is received from the cloud.

But again, the mosquitto bridge detects and handles these cycles somehow, but the built-in bridge doesn't. So, either we should improve the built-in bridge to handle the same, or have either one of the following rules enforced for the mqtt service bridge:

  1. Use different prefixes to bridge outbound and inbound messages
  2. Do not bridge all published messages on c8y-mqtt/#, but make the user explicitly specify the publish topics to be bridged as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some deduplication logic we already have for the AWS shadow topics. Unfortunately there isn't a nice way to naturally avoid loops with MQTT v3 since there is no way to opt out of receiving messages you published (assuming you are subscribed to the relevant topic). So to detect and avoid the loops, the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.

We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally. For testing this works against mosquitto, forwarding the cloud topics bidirectionally (using tc.forward_bidirectionally) and subscribing separately to # should be fine, since mosquitto will essentially ignore the double subscription (but this would be problematic with nanomq).

In short, provided we don't have complex overlapping rules, it should be quite possible to make this just work.

Copy link
Contributor Author

@albinsuresh albinsuresh Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.

That sounds fine. But, to be on the safer side with that caching (esp with half open connections), it'd be better if the cached entries have a time-to-live as well, post-which they are cleared whether or not the duplicate is received from the cloud or not. Sure, this might lead to a late duplicate message getting resent to the cloud once more, but that should settle after a few loops once the network stabilises.

We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally

I didn't understand that proposal. How does a bidirectional subscription to the same topic (e.g: demo/topic) both on the cloud and the local help? Esp since the expectation on the local broker is to bridge messages on c8y-mqtt/demo/topic (included in c8y-mqtt/#) and not demo/topic directly. Could you explain where/how the loop is broken when a message is received first from the cloud on demo/topic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do a bidirectional forward, this tells the bridge to break loops on those messages. It can have different prefixes on local and cloud topics, which is exactly how the AWS shadow topic works at the moment.

Because the bridge knows in advance the topic will cause a loop, it can cache the published message when it forwards to local/cloud and then use that cached message to ignore the same message when the bridge receives the message from the target broker.

The only difference from the AWS shadow topics is that we have a wildcard subscription on local and not the cloud so if we add bidirectional rules to perform de-duplication for the cloud topics, we would then subscribe to c8y-mqtt/# on the local broker and duplicate all of those subscriptions. This isn't going to cause a problem with mosquitto, so should be a valid way to verify that the existing logic works, but we will need to ensure we don't have duplicate subscriptions in the final version as that will misbehave with NanoMQ.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Use different prefixes to bridge outbound and inbound messages

I see this as the simplest and more robust route.

Copy link
Contributor

@reubenmiller reubenmiller Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But still using a common prefix to group the c8y-mqtt messages together, but just adding two nested topics:

  • c8y-mqtt/in
  • c8y-mqtt/out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b25610a as @jarhodes314 suggested as that was also a trivial one-liner.


/// MQTT service endpoint for the Cumulocity tenant, with optional port.
#[tedge_config(example = "mqtt.your-tenant.cumulocity.com:9883")]
#[tedge_config(default(from_optional_key = "c8y.url"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not better derived from c8y.mqtt, which will likely already have the correct domain for MQTT connections (I'm assuming this is the same issue as for standard MQTT connections)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved by 015ee3d

}

// Templates
tc.forward_from_local("#", local_prefix.clone(), "")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some deduplication logic we already have for the AWS shadow topics. Unfortunately there isn't a nice way to naturally avoid loops with MQTT v3 since there is no way to opt out of receiving messages you published (assuming you are subscribed to the relevant topic). So to detect and avoid the loops, the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.

We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally. For testing this works against mosquitto, forwarding the cloud topics bidirectionally (using tc.forward_bidirectionally) and subscribing separately to # should be fine, since mosquitto will essentially ignore the double subscription (but this would be problematic with nanomq).

In short, provided we don't have complex overlapping rules, it should be quite possible to make this just work.

@reubenmiller reubenmiller added the theme:c8y Theme: Cumulocity related topics label Jul 15, 2025
Copy link
Contributor

github-actions bot commented Jul 16, 2025

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
654 2 3 656 99.70 1h49m40.396858999s

Failed Tests

Name Message ⏱️ Duration Suite
Run shell custom operation for main device and publish the status Matching messages on topic 'c8y/s/us' is greater than maximum. wanted: 2 got: 4 messages: ['504,20137275', '506,20137275,"helloworld1\n"', '504,20137275', '506,20137275,"helloworld1\n"'] 34.845 s Custom Operation Command
Can use PKCS11 key to renew the public certificate tedge cert renew c8y returned an unexpected exit code stdout: stderr: Error: failed to renew the device certificate via Cumulocity HTTP proxy http://127.0.0.1:8001 Caused by: The request failed with 400 Bad Request: {"message":"Invalid PKCS#10 signature.","error":"Certificate Authority/Bad Request","info":"https://cumulocity.com/api/core/"} 76.843 s Private Key Storage


You can customize the documentation and commands shown on this page by providing relevant settings which will be reflected in the instructions. It makes it even easier to explore and use %%te%%.

<UserContextForm settings="DEVICE_ID,C8Y_URL,C8Y_TENANT_ID,C8Y_USERNAME,C8Y_PASSWORD" />
Copy link
Contributor Author

@albinsuresh albinsuresh Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accompanying tedge-docs PR that introduces the keys C8Y_TENANT_ID and C8Y_PASSWORD.

@albinsuresh
Copy link
Contributor Author

albinsuresh commented Jul 18, 2025

Here are some of the rough edges around this feature in its current state:

  1. When an existing user wants to try the mqtt service feature when he's already connected to c8y, he's forced to do a reconnect, since mqtt service connection is part of the same tedge connect c8y flow. Though this was done for a seamless migration from core mqtt to mqtt service under-the-hood in the future, during this transition phase, it could be a bit annoying to some.

  2. The tedge connect c8y command is not very explicit about 2 connections being established to 2 different endpoints, expect for the following line in the output: Creating mosquitto bridge to MQTT service... ✓. Again, something we intentionally did to not be too explicit about a secondary connection being established, but for those early adopters of the mqtt servcie, this might be too little.

  3. The tedge connect c8y --test currently relies on the health status message of the mqtt service bridge to determine if it's healthy or not, unlike the core mqtt bridge test that really sends the JWT request message and waits for the response. With the mqtt service, since there is no such inbuilt request/response topic pairs, we were left with no choice but to rely only on the health status. The issue with this approach is that the health status is not updated immediately on events such as a network disconnection as it can take a while (the ping interval) for the bridge to detect this.

    The saving grace here is that we're checking both the core mqtt connection and the mqtt service connection together, and hence for issues like network disconnection, the jwt token check would detect it anyway and fail the c8y --test. It is only an issue when only the connection to mqtt service is broken while the core mqtt connection is still intact.

  4. Even though the mqtt service rejects subscription requests to certain reserved topics like s/ds, if that topic is included in the c8y.mqtt_service.topics list along with others, the bridge still gets established with this problematic topic silently ignored. This lack of feedback is a bit problematic. With mosquitto bridge, only the problematic topics are ignored. With built-in bridge, all the topics are ignored. So, effectively you'd end up with a bridge that appears healthy, but some/none of the subscriptions are working.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
theme:c8y Theme: Cumulocity related topics
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants